package com.hzya.frame;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Tags({"zjrc", "bank", "api", "财资宝", "农商行", "pagination"})
@CapabilityDescription("调用浙江农商联合银行财资宝的通用接口处理器，并自动处理分页查询。通过配置'交易码'属性来指定调用的具体接口。处理器接收一个包含reqBody的JSON作为输入FlowFile。")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute = "", description = "")})
@WritesAttributes({@WritesAttribute(attribute = "zjrc.api.error.message", description = "当API调用失败或银行返回错误时，此处会包含错误信息。"), @WritesAttribute(attribute = "zjrc.api.retCode", description = "银行返回的业务代码，例如 '0000'。"), @WritesAttribute(attribute = "zjrc.api.total.pages.fetched", description = "成功完成分页查询后，记录总共获取了多少页的数据。")})
public class ZJRCGenericApiProcessor extends AbstractProcessor {

    // --- 新增超时属性 ---
    public static final PropertyDescriptor PROP_CONNECT_TIMEOUT = new PropertyDescriptor.Builder().name("连接超时时间").displayName("连接超时时间").description("建立HTTP连接的超时时间。").defaultValue("10 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();

    public static final PropertyDescriptor PROP_READ_TIMEOUT = new PropertyDescriptor.Builder().name("读取超时时间").displayName("读取超时时间").description("等待HTTP响应的超时时间。").defaultValue("30 s").required(true).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();

    // --- 已有属性保持不变 ---
    public static final PropertyDescriptor PROP_TRAN_CODE = new PropertyDescriptor.Builder().name("交易码 (tranCode)").displayName("交易码 (tranCode)").description("需要调用的接口交易码，例如 300004 (账户交易明细查询)。").required(true).defaultValue("300004").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public static final PropertyDescriptor PROP_CLIENT_URL = new PropertyDescriptor.Builder().name("财资宝前置客户端URL").displayName("财资宝前置客户端URL").description("财资宝前置客户端的URL地址，不包含任何查询参数，例如：http://115.231.255.202:8433/ERPClient/receive").required(true).addValidator(StandardValidators.URL_VALIDATOR).build();

    public static final PropertyDescriptor PROP_CUST_NO = new PropertyDescriptor.Builder().name("企业客户号 (custNo)").displayName("企业客户号 (custNo)").description("银行分配的企业网银客户号。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public static final PropertyDescriptor PROP_USER_ID = new PropertyDescriptor.Builder().name("用户号 (userId)").displayName("用户号 (userId)").description("虚拟用户ID，通常以前缀'Y'开头。").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("API调用成功且所有分页数据成功获取后，包含合并结果的FlowFile将被路由到此。").build();

    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("API调用失败或银行返回非'0000'错误码的FlowFile将被路由到此。").build();

    private List<PropertyDescriptor> descriptors;
    private Set<Relationship> relationships;
    private final ObjectMapper objectMapper = new ObjectMapper();

    // --- 核心修改点：将“查询结束”的错误码定义为常量 ---
    private static final String END_OF_PAGES_RET_CODE = "CTM0000006";

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<>();
        descriptors.add(PROP_CLIENT_URL);
        descriptors.add(PROP_CUST_NO);
        descriptors.add(PROP_USER_ID);
        descriptors.add(PROP_TRAN_CODE);
        descriptors.add(PROP_CONNECT_TIMEOUT);
        descriptors.add(PROP_READ_TIMEOUT);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<>();
        relationships.add(REL_SUCCESS);
        relationships.add(REL_FAILURE);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        try {
            final StringBuilder reqBodyBuilder = new StringBuilder();
            try (InputStream in = session.read(flowFile)) {
                reqBodyBuilder.append(new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")));
            }
            ObjectNode reqBodyTemplate = (ObjectNode) objectMapper.readTree(reqBodyBuilder.toString());

            int currentPage = 1;
            List<JsonNode> allDetails = new ArrayList<>();

            while (true) {
                ObjectNode bizContent = (ObjectNode) reqBodyTemplate.path("bizContent");
                if (bizContent.isMissingNode()) {
                    bizContent = reqBodyTemplate;
                }
                bizContent.put("current", currentPage);

                String responseBody = sendRequest(context, reqBodyTemplate);
                JsonNode responseJson = objectMapper.readTree(responseBody);

                JsonNode resHead = responseJson.path("resHead");
                String retCode = resHead.path("retCode").asText();

                // --- 核心改造点：更智能的分页终止逻辑 ---
                if ("0000".equals(retCode)) {
                    // 正常成功返回
                    JsonNode resBody = responseJson.path("resBody");
                    JsonNode contentNode = resBody.path("content");
                    if (resBody.isNull() || contentNode.isNull() || contentNode.isMissingNode() || contentNode.asText().isEmpty()) {
                        getLogger().info("在第 {} 页未获取到content内容，分页结束。总共获取了 {} 页数据。", currentPage, currentPage - 1);
                        break; // 终止条件1：内容为空
                    }

                    String contentStr = contentNode.asText();
                    JsonNode contentJson = objectMapper.readTree(contentStr);
                    JsonNode detailsList = contentJson.path("detailsResBodyList");
                    if (detailsList.isArray()) {
                        for (JsonNode detail : detailsList) {
                            allDetails.add(detail);
                        }
                    }
                } else if (END_OF_PAGES_RET_CODE.equals(retCode)) {
                    // 将特定的错误码视为分页结束的信号
                    getLogger().info("在第 {} 页收到错误码 {}，判定为分页已结束。", currentPage, retCode);
                    break; // 终止条件2：收到“已查尽”的特定错误码
                } else {
                    // 其他所有错误码均视为硬性失败
                    String errMsg = resHead.path("errMsg").asText("未知错误");
                    getLogger().error("银行API在第 {} 页返回不可恢复的错误: retCode={}, errMsg={}, FlowFile: {}", currentPage, retCode, errMsg, flowFile.getId());
                    flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", "Page " + currentPage + ": " + errMsg);
                    flowFile = session.putAttribute(flowFile, "zjrc.api.retCode", retCode);
                    session.transfer(flowFile, REL_FAILURE);
                    return; // 终止整个流程
                }

                currentPage++;
            }

            ObjectNode finalResult = objectMapper.createObjectNode();
            ArrayNode finalArray = objectMapper.valueToTree(allDetails);
            finalResult.set("allTransactionDetails", finalArray);

            flowFile = session.write(flowFile, out -> objectMapper.writerWithDefaultPrettyPrinter().writeValue(out, finalResult));
            flowFile = session.putAttribute(flowFile, "zjrc.api.total.pages.fetched", String.valueOf(currentPage - 1));
            session.transfer(flowFile, REL_SUCCESS);

        } catch (Exception e) {
            getLogger().error("处理FlowFile时发生未知异常: {}", e.getMessage(), e);
            flowFile = session.putAttribute(flowFile, "zjrc.api.error.message", e.getMessage());
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private String sendRequest(final ProcessContext context, final JsonNode reqBodyNode) throws Exception {
        final String baseClientUrl = context.getProperty(PROP_CLIENT_URL).getValue();
        final String custNo = context.getProperty(PROP_CUST_NO).getValue();
        final String userId = context.getProperty(PROP_USER_ID).getValue();
        final String tranCode = context.getProperty(PROP_TRAN_CODE).getValue();
        final int connectTimeout = context.getProperty(PROP_CONNECT_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        final int readTimeout = context.getProperty(PROP_READ_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();

        ObjectNode rootNode = objectMapper.createObjectNode();
        ObjectNode reqHead = objectMapper.createObjectNode();
        Date now = new Date();

        reqHead.put("custNo", custNo);
        reqHead.put("userId", userId);
        reqHead.put("tranCode", tranCode);
        reqHead.put("serialNo", UUID.randomUUID().toString().replace("-", ""));
        reqHead.put("reqDate", new SimpleDateFormat("yyyyMMdd").format(now));
        reqHead.put("reqTime", new SimpleDateFormat("HHmmssSSS").format(now));

        rootNode.set("reqHead", reqHead);
        rootNode.set("reqBody", reqBodyNode);

        String jsonPayload = objectMapper.writeValueAsString(rootNode);
        String encodedJson = URLEncoder.encode(jsonPayload, "UTF-8");
        String finalUrl = baseClientUrl + "?reqJson=" + encodedJson;

        URL url = new URL(finalUrl);
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setRequestMethod("POST");
        connection.setConnectTimeout(connectTimeout);
        connection.setReadTimeout(readTimeout);
        connection.setRequestProperty("Accept", "*/*");
        connection.setDoOutput(true);

        try (OutputStream os = connection.getOutputStream()) {
            os.write("".getBytes());
        }

        int responseCode = connection.getResponseCode();
        if (responseCode == HttpURLConnection.HTTP_OK) {
            try (InputStream is = connection.getInputStream()) {
                return new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
            }
        } else {
            try (InputStream es = connection.getErrorStream()) {
                // 即便HTTP>400，也需要返回body，因为里面可能包含retCode
                if (es != null) {
                    return new BufferedReader(new InputStreamReader(es, StandardCharsets.UTF_8)).lines().collect(Collectors.joining("\n"));
                } else {
                    throw new IOException("HTTP请求失败，状态码: " + responseCode + "，且没有错误信息返回。");
                }
            }
        }
    }
}